/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.geode.distributed.internal.locks;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

import org.apache.logging.log4j.Logger;

import org.apache.geode.DataSerializer;
import org.apache.geode.distributed.internal.DM;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.MessageWithReply;
import org.apache.geode.distributed.internal.PooledDistributionMessage;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.ReplyMessage;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
import org.apache.geode.internal.logging.log4j.LogMarker;

/**
 * A processor for initializing the ElderState. This may involve sending
 * a message to every existing member to discover what services they have.
 *
 * @since GemFire 4.0
 */
public class ElderInitProcessor extends ReplyProcessor21 {
  private static final Logger logger = LogService.getLogger();
  
  private final HashMap grantors;
  private final HashSet crashedGrantors;
  
  ////////// Public static entry point /////////


  /**
   * Initializes ElderState map by recovering all existing grantors
   * and crashed grantors in the current ds.
   */
  static void init(DM dm, HashMap map) {
    HashSet crashedGrantors = new HashSet();
    if (!dm.isAdam()) {
      Set others = dm.getOtherDistributionManagerIds();
      if (!others.isEmpty()) {
        ElderInitProcessor processor = new ElderInitProcessor(dm, others,
                                                              map,
                                                              crashedGrantors);
        ElderInitMessage.send(others, dm, processor);
        try {
          processor.waitForRepliesUninterruptibly();
        } catch (ReplyException e) {
          e.handleAsUnexpected();
        }
      }
    }
    // always recover from ourself
    GrantorRequestProcessor.readyForElderRecovery(
        dm.getSystem(),
        null, null);
    DLockService.recoverLocalElder(dm, map, crashedGrantors);
    {
      Iterator it = crashedGrantors.iterator();
      while (it.hasNext()) {
        map.put(it.next(), new GrantorInfo(null, 0, 0, true));
      }
    }
  }
  ////////////  Instance methods //////////////
  
  /** Creates a new instance of ElderInitProcessor
   */
  private ElderInitProcessor(DM dm, Set others, HashMap grantors,
                             HashSet crashedGrantors) {
    super(dm/*fix bug 33297*/, others);
    this.grantors = grantors;
    this.crashedGrantors = crashedGrantors;
  }

  /**
   * Note the synchronization; we can only process one response at a time.
   */
  private synchronized void processData(ArrayList rmtGrantors, ArrayList rmtGrantorVersions, ArrayList rmtGrantorSerialNumbers, ArrayList rmtNonGrantors, InternalDistributedMember rmtId) {
    {
      Iterator iterGrantorServices = rmtGrantors.iterator();
      Iterator iterGrantorVersions = rmtGrantorVersions.iterator();
      Iterator iterGrantorSerialNumbers = rmtGrantorSerialNumbers.iterator();
      while (iterGrantorServices.hasNext()) {
        String serviceName = (String)iterGrantorServices.next();
        long versionId = ((Long)iterGrantorVersions.next()).longValue();
        int serialNumber = ((Integer)iterGrantorSerialNumbers.next()).intValue();
        GrantorInfo oldgi = (GrantorInfo)this.grantors.get(serviceName);
        if (oldgi == null
            || oldgi.getVersionId() < versionId) {
          this.grantors.put(serviceName, new GrantorInfo(rmtId, versionId, serialNumber, false));
          this.crashedGrantors.remove(serviceName);
        }
      }
    }
    {
      Iterator it = rmtNonGrantors.iterator();
      while (it.hasNext()) {
        String serviceName = (String)it.next();
        if (!this.grantors.containsKey(serviceName)) {
          this.crashedGrantors.add(serviceName);
        }
      }
    }
  }
  @Override
  public void process(DistributionMessage msg) {
    if (msg instanceof ElderInitReplyMessage) {
      ElderInitReplyMessage eiMsg = (ElderInitReplyMessage)msg;
      processData(eiMsg.getGrantors(), eiMsg.getGrantorVersions(), eiMsg.getGrantorSerialNumbers(), eiMsg.getNonGrantors(), eiMsg.getSender());
    } else {
      Assert.assertTrue(false, "Expected instance of ElderInitReplyMessage but got " + msg.getClass());
    }
    super.process(msg);
  }
  
  ///////////////   Inner message classes  //////////////////
  
  public static final class ElderInitMessage
    extends PooledDistributionMessage implements MessageWithReply
  {
    private int processorId;
    
    protected static void send(Set others,
                             DM dm, 
                             ReplyProcessor21 proc)
    {
      ElderInitMessage msg = new ElderInitMessage();
      msg.processorId = proc.getProcessorId();
      msg.setRecipients(others);
      if (logger.isTraceEnabled(LogMarker.DLS)) {
        logger.trace(LogMarker.DLS, "ElderInitMessage sending {} to {}", msg, others);
      }
      dm.putOutgoing(msg);
    }
  
    @Override
    public int getProcessorId() {
      return this.processorId;
    }

    private void reply(DM dm, ArrayList grantors, ArrayList grantorVersions, ArrayList grantorSerialNumbers, ArrayList nonGrantors) {
      ElderInitReplyMessage.send(this, dm, grantors, grantorVersions, grantorSerialNumbers, nonGrantors);
    }
    
    @Override
    protected void process(DistributionManager dm) {
      ArrayList grantors = new ArrayList();        // svc names grantor for
      ArrayList grantorVersions = new ArrayList(); // grantor versions
      ArrayList grantorSerialNumbers = new ArrayList(); // serial numbers of grantor svcs
      ArrayList nonGrantors = new ArrayList();     // svc names non-grantor for
      if (dm.waitForElder(this.getSender())) {
        GrantorRequestProcessor.readyForElderRecovery(
            dm.getSystem(),
            this.getSender(), null);
        DLockService.recoverRmtElder(grantors, grantorVersions, 
            grantorSerialNumbers, nonGrantors);
        reply(dm, grantors, grantorVersions, grantorSerialNumbers, nonGrantors);
      }
      else
      if (dm.getOtherNormalDistributionManagerIds().isEmpty())
      { // bug 38690
        // Either we're alone (and received a message from an unknown member) 
        // or else we haven't yet processed a view, In either case, we clearly 
        // don't have any grantors, so we return empty lists.
        logger.info(LogMarker.DLS, LocalizedMessage.create(
            LocalizedStrings.ElderInitProcessor__0_RETURNING_EMPTY_LISTS_BECAUSE_I_KNOW_OF_NO_OTHER_MEMBERS, this));
        reply(dm, grantors, grantorVersions, grantorSerialNumbers, nonGrantors);
      }
      else { // TODO make this fine level?
        logger.info(LogMarker.DLS, LocalizedMessage.create(
            LocalizedStrings.ElderInitProcessor_0_DISREGARDING_REQUEST_FROM_DEPARTED_MEMBER, this));
      }
    }
    
    public int getDSFID() {
      return ELDER_INIT_MESSAGE;
    }
    
    @Override
    public void fromData(DataInput in)
    throws IOException, ClassNotFoundException {
      super.fromData(in);
      this.processorId = in.readInt();
    }
    
    @Override
    public void toData(DataOutput out) throws IOException {
      super.toData(out);
      out.writeInt(this.processorId);
    }
    
    @Override
    public String toString() {
      StringBuffer buff = new StringBuffer();
      buff.append("ElderInitMessage (processorId='")
        .append(this.processorId)
        .append(")");
      return buff.toString();
    }
  }
  
  public static final class ElderInitReplyMessage extends ReplyMessage {
    private ArrayList grantors;         // svc names
    private ArrayList grantorVersions;  // grantor version longs
    private ArrayList grantorSerialNumbers; // grantor dls serial number ints
    private ArrayList nonGrantors;      // svc names

    public static void send(MessageWithReply reqMsg, 
                            DM dm,
                            ArrayList grantors, 
                            ArrayList grantorVersions, 
                            ArrayList grantorSerialNumbers, 
                            ArrayList nonGrantors)
    {
      ElderInitReplyMessage m = new ElderInitReplyMessage();
      m.grantors = grantors;
      m.grantorVersions = grantorVersions;
      m.grantorSerialNumbers = grantorSerialNumbers;
      m.nonGrantors = nonGrantors;
      m.processorId = reqMsg.getProcessorId();
      m.setRecipient(reqMsg.getSender());
      dm.putOutgoing(m);
    }

    public ArrayList getGrantors() {
      return this.grantors;
    }
    public ArrayList getGrantorVersions() {
      return this.grantorVersions;
    }
    public ArrayList getGrantorSerialNumbers() {
      return this.grantorSerialNumbers;
    }
    public ArrayList getNonGrantors() {
      return this.nonGrantors;
    }
    
    @Override
    public int getDSFID() {
      return ELDER_INIT_REPLY_MESSAGE;
    }

    @Override
    public void fromData(DataInput in)
    throws IOException, ClassNotFoundException {
      super.fromData(in);
      this.grantors = DataSerializer.readArrayList(in);
      this.grantorVersions = DataSerializer.readArrayList(in);
      this.grantorSerialNumbers = DataSerializer.readArrayList(in);
      this.nonGrantors = DataSerializer.readArrayList(in);
    }
    
    @Override
    public void toData(DataOutput out) throws IOException {
      super.toData(out);
      DataSerializer.writeArrayList(this.grantors, out);
      DataSerializer.writeArrayList(this.grantorVersions, out);
      DataSerializer.writeArrayList(this.grantorSerialNumbers, out);
      DataSerializer.writeArrayList(this.nonGrantors, out);
    }

    @Override
    public String toString() {
      StringBuffer buff = new StringBuffer();
      buff.append("ElderInitReplyMessage")
        .append("; sender=")
        .append(getSender())
        .append("; processorId=")
        .append(super.processorId)
        .append("; grantors=")
        .append(this.grantors)
        .append("; grantorVersions=")
        .append(this.grantorVersions)
        .append("; grantorSerialNumbers=")
        .append(this.grantorSerialNumbers)
        .append("; nonGrantors=")
        .append(this.nonGrantors)
        .append(")");
      return buff.toString();
    }
  }
}

